6

Concurrency

Concurrency and one of its manifestations, parallel processing, are among the broadest topics in the area of software engineering. Concurrency is such a huge topic that dozens of books could be written and we would still not be able to discuss all of its important aspects and models. The purpose of this chapter is to show you why concurrency may be required in your application, when to use it, and what Python's most important concurrency models are.

We will discuss some of the language features, built-in modules, and third-party packages that allow you to implement these models in your code. But we won't cover them in much detail. Treat the content of this chapter as an entry point for your own research and reading. We will try to guide you through the basic ideas and help in deciding if you really need concurrency. Hopefully, after reading this chapter you will be able to tell which approach suits your needs best.

In this chapter, we will cover the following topics:

  • What is concurrency?
  • Multithreading
  • Multiprocessing
  • Asynchronous programming

Before we get into the basic concepts of concurrency, let's begin by considering the technical requirements.

Technical requirements

The following are the Python packages that are used in this chapter, which you can download from PyPI:

  • requests
  • aiohttp

Information on how to install packages is included in Chapter 2, Modern Python Development Environments.

The code files for this chapter can be found at https://github.com/PacktPublishing/Expert-Python-Programming-Fourth-Edition/tree/main/Chapter%206.

Before we delve into various implementations of concurrency available to Python programmers, let's discuss what concurrency actually is.

What is concurrency?

Concurrency is often confused with actual methods of implementing it. Some programmers also think that it is a synonym for parallel processing. This is the reason why we need to start by properly defining concurrency. Only then will we be able to properly understand various concurrency models and their key differences.

First and foremost, concurrency is not the same as parallelism. Concurrency is also not a matter of application implementation. Concurrency is a property of a program, algorithm, or problem, whereas parallelism is just one of the possible approaches to problems that are concurrent.

In Leslie Lamport's 1976 paper Time, Clocks, and the Ordering of Events in Distributed Systems, he defines the concept of concurrency as follows:

"Two events are concurrent if neither can causally affect the other."

By extrapolating events to programs, algorithms, or problems, we can say that something is concurrent if it can be fully or partially decomposed into components (units) that are order-independent. Such units may be processed independently from each other, and the order of processing does not affect the final result. This means that they can also be processed simultaneously or in parallel. If we process information this way (that is, in parallel), then we are indeed dealing with parallel processing. But this is still not obligatory.

Doing work in a distributed manner, preferably using the capabilities of multicore processors or computing clusters, is a natural consequence of concurrent problems. Anyway, it does not mean that this is the only way of efficiently dealing with concurrency. There are a lot of use cases where concurrent problems can be approached in ways other than synchronous ways, but without the need for parallel execution. In other words, when a problem is concurrent, it gives you the opportunity to deal with it in a special, preferably more efficient, way.

We often get used to solving problems in a classical way: by performing a sequence of steps. This is how most of us think and process information—using synchronous algorithms that do one thing at a time, step by step. But this way of processing information is not well suited to solving large-scale problems or when you need to satisfy the demands of multiple users or software agents simultaneously:

  • When the time to process the job is limited by the performance of the single processing unit (a single machine, CPU core, and so on)
  • When you are not able to accept and process new inputs until your program has finished processing the previous one

These problems create three common application scenarios where concurrent processing is a viable approach to satisfy user needs:

  • Processing distribution: The scale of the problem is so big that the only way to process it in an acceptable time frame (with constrained resources) is to distribute execution on multiple processing units that can handle the work in parallel.
  • Application responsiveness: Your application needs to maintain responsiveness (accept new inputs), even if it did not finish processing previous inputs.
  • Background processing: Not every task needs to be performed in a synchronous way. If there is no need to immediately access the results of a specific action, it may be reasonable to defer execution in time.

The processing distribution scenario directly maps to parallel processing. That's why it is usually solved with multithreading and multiprocessing models. The application responsiveness scenario often doesn't require parallel processing, so the actual solution really depends on the problem details. The problem of application responsiveness also covers the case when the application needs to serve multiple clients (users or software agents) independently, without the need to wait for others to be successfully served.

It is an interesting observation that these groups of problems are not exclusive. Often, you will have to maintain application responsiveness and at the same time won't be able to handle all the inputs on a single processing unit. This is the reason why different and seemingly alternative or conflicting approaches to concurrency may often be used at the same time. This is especially common in the development of web servers, where it may be necessary to use asynchronous event loops, or threads in conjunction with multiple processes, in order to utilize all the available resources and still maintain low latencies under the high load.

Python provides several ways to deal with concurrency. These are mainly:

  • Multithreading: This is characterized by running multiple threads of execution that share the memory context of the parent process. It is one of the most popular (and oldest) concurrency models and works best in applications that do a lot of I/O (Input/Output) operations or need to maintain user interface responsiveness. It is fairly lightweight but comes with a lot of caveats and memory safety risks.
  • Multiprocessing: This is characterized by running multiple independent processes to perform work in a distributed manner. It is similar to threads in operation, although it does not rely on a shared memory context. Due to the nature of Python, it is better suited for CPU-intensive applications. It is more heavyweight than multithreading and requires implementing inter-process communication patterns to orchestrate work between processes.
  • Asynchronous programming: This is characterized by running multiple cooperative tasks within a single application process. Cooperative tasks work like threads, although switching between them is facilitated by the application itself instead of the operating system kernel. It is well suited to I/O-bound applications, especially for programs that need to handle multiple simultaneous network connections. The downside of asynchronous programming is the need to use dedicated asynchronous libraries.

The first model we will discuss in detail is multithreading.

Multithreading

Developers often consider multithreading to be a very complex topic. While this statement is totally true, Python provides high-level classes and functions that greatly help in using threads. CPython has some inconvenient implementation details that make threads less effective than in other programming languages like C or Java. But that doesn't mean that they are completely useless in Python.

There is still quite a large range of problems that can be solved effectively and conveniently with Python threads.

In this section, we will discuss those limitations of multithreading in CPython, as well as the common concurrent problems for which Python threads are still a viable solution.

What is multithreading?

Thread is short for a thread of execution. A programmer can split their work into threads that run simultaneously. Threads are still bound to the parent process and can easily communicate because they share the same memory context. The execution of threads is coordinated by the OS kernel.

Multithreading will benefit from a multiprocessor or multicore machines, where each thread can be executed on a separate CPU core, thus making the program run faster. This is a general rule that should hold true for most programming languages. In Python, the performance benefit from multithreading on multicore CPUs has some limits, which we will discuss later. For the sake of simplicity, let's assume for now that this statement is also true for Python.

The simplest way to start a new thread of execution using Python is to use the threading.Thread() class as in the following example:

def my_function():
    print("printing from thread")
if __name__ == "__main__":
    thread = Thread(target=my_function)
    thread.start()
    thread.join()

The my_function() function is the function we want to execute in the new thread. We pass it to the Thread class constructor as the target keyword argument. Instances of this class are used to encapsulate and control application threads.

Creating a new Thread class instance is not enough to start a new thread. In order to do this, you need to call the start() method. Once the new thread is started, it will be running next to the main thread until the target function finishes. In the above example, we explicitly wait for the extra thread to finish using the join() method.

We say that the join() method is a blocking operation. This means that the thread isn't doing anything in particular (it doesn't consume CPU time) and simply waits for a specific event to happen.

The start() and join() methods allow you to create and start multiple threads at once. The following is a simple modification of the previous example that starts and joins multiple threads in bulk:

from threading import Thread
def my_function():
    print("printing from thread")
if __name__ == "__main__":
    threads = [Thread(target=my_function) for _ in range(10)]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()

All threads share the same memory context. This means that you must be extremely wary about how your threads access the same data structures. If two parallel threads update the same variable without any protection, there might be a situation where a subtle timing variation in thread execution can alter the final result in an unexpected way. To better understand this problem, let's consider a small program that runs multiple threads reading and updating the same value:

from threading import Thread
thread_visits = 0
def visit_counter():
    global thread_visits
    for i in range(100_000):
        value = thread_visits
        thread_visits = value + 1
if __name__ == "__main__":
    thread_count = 100
    threads = [
        Thread(target=visit_counter)
        for _ in range(thread_count)
    ]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()
    print(f"{thread_count=}, {thread_visits=}")

The above program starts 100 threads and each one tries to read and increment the thread_visits variable 100,000 times. If we were to run the tasks sequentially, the final value of the thread_visits variable should be 10,000,000. But threads can interweave and lead to unexpected results. Let's save the above code example in the threaded_visits.py file and run it a few times to see the actual results:

$ python3 thread_visits.py
thread_count=100, thread_visits=6859624
$ python3 thread_visits.py
thread_count=100, thread_visits=7234223
$ python3 thread_visits.py
thread_count=100, thread_visits=7194665

On each run, we got a completely different number, and it was always very far from the expected 10,000,000 thread visits. But that doesn't mean that the actual number of thread visits was that small. With such a large number of threads, they started interweaving and affecting our results.

Such a situation is called a race hazard or race condition. It is one of the most hated culprits of software bugs for multithreaded applications. Obviously, there is a slice of time between the read and write operations on the thread_visits variable where another thread can step in and manipulate the result.

One might think that the problem could be fixed using the += operator, which looks like a single atomic operation:

def visit_counter():
    global thread_visits
    for i in range(100_000):
        thread_visits += 1

But that won't help us either! The += operator is just a shorthand for incrementing a variable, but it will actually take a few operations in the Python interpreter. Between those operations, there's still time for threads to interweave.

The proper way around race conditions is to use thread locking primitives. Python has a few lock classes in the threading module. Here we can use the simplest one—threading.Lock. The following is an example of a thread-safe visit_counter() function:

from threading import Lock
thread_visits = 0
thread_visits_lock = Lock()
def visit_counter():
    global thread_visits
    for i in range(100_000):
        with thread_visits_lock:
            thread_visits += 1

If you run the modified version of the code, you will notice that thread visits with locks are counted properly. But that will be at the expense of performance. The threading Lock() will make sure that only one thread at a time can process a single block of code. This means that the protected block cannot run in parallel. Moreover, acquiring and releasing the lock are operations that require some additional effort. With a lot of threads trying to access the lock, a performance drop will be noticeable. We will see other examples of using locks to secure parallel data access later in the chapter.

Multithreading is usually supported at the OS kernel level. When a machine has a single processor with a single core, the system uses a time slicing mechanism to allow threads to run seemingly in parallel. With time slicing, the CPU switches from one thread to another so fast that there is an illusion of threads running simultaneously.

Single-core CPUs are pretty uncommon these days in desktop computers but can still be a concern in other areas. Small and cheap instances in many cloud compute platforms, as well as low-cost embedded systems, often have only single-core CPUs or virtual CPUs.

Parallelism without multiple processing units is obviously virtual, and the application performance gain on such hardware is harder to evaluate. Anyway, sometimes, it is still useful to implement code with threads, even if it means having to execute on a single core. We will review such use cases later.

Everything changes when your execution environment has multiple processors or multiple processor cores. In such cases, threads can be distributed among CPUs or their cores by the OS kernel. This thus provides the opportunity to run your program substantially faster. This is true for many programming languages but not necessarily for Python. To understand why that is so, let's take a closer look at how Python deals with threads.

How Python deals with threads

Unlike some other languages, Python uses multiple kernel-level threads that can run any of the interpreter-level threads. Kernel-level threads are operated and scheduled by the OS kernel. CPython uses OS-specific system calls to create threads and join threads. It doesn't have full control over when threads run and on which CPU core they will execute. These responsibilities are left to the sole discretion of the system kernel. Moreover, the kernel can preempt a running thread at any time, for instance, to run a thread with a higher priority.

Unfortunately, the standard implementation of the Python (the CPython interpreter) language comes with a major limitation that renders threads less useful in many contexts. All operations accessing Python objects are serialized by one global lock. This is done because many of the interpreter's internal structures are not thread-safe and need to be protected. Not every operation requires locking, and there are certain situations when threads release the lock.

In the context of parallel processing, if we say that something is serialized, we mean that actions are taken in a serial fashion (one after another). Unintended serialization in concurrent programs is usually something that we want to avoid.

This mechanism of the CPython interpreter is known as the Global Interpreter Lock (GIL). The removal of the GIL is a topic that occasionally appears on the Python-dev emailing list and was postulated by Python developers multiple times. Sadly, at the time of writing, no one has ever managed to provide a reasonable and simple solution that would allow you to get rid of this limitation. It is highly improbable that we will see any progress in this area anytime soon. It is safer to assume that the GIL will stay in CPython, and so we need to learn how to live with it.

So, what is the point of multithreading in Python? When threads contain only pure Python code and don't do any I/O operations (like communicating through sockets), there is little point in using threads to speed up the program. That's because the GIL will most likely globally serialize the execution of all threads. But remember that the GIL cares only about protecting Python objects. In practice, the GIL is released on a number of blocking system calls like socket calls. It can be also released in sections of C extensions that do not use any Python/C API functions. This means that multiple threads can do I/O operations or execute specifically crafted C extension code completely in parallel.

We will discuss the details of interacting with the GIL in Python C extensions in Chapter 9, Bridging Python with C and C++.

Multithreading allows you to efficiently utilize time when your program is waiting for an external resource. This is because a sleeping thread that has released the GIL (this happens internally in CPython) can wait on "standby" and "wake up" when the results are back. Last, whenever a program needs to provide a responsive interface, multithreading can be an answer, even in single-core environments where the OS needs to use time slicing. With multithreading, the program can easily interact with the user while doing some heavy computing in the so-called background.

Note that the GIL does not exist in every implementation of the Python language. It is a limitation of CPython, Stackless Python, and PyPy, but does not exist in Jython (Python for JVM) and IronPython (Python for .NET). There has also been some development of a GIL-free version of PyPy. It is based on software transactional memory and is called PyPy-STM.

In the next section, we will discuss more specific examples of situations where threading can be useful.

When should we use multithreading?

Despite the GIL limitation, threads can be really useful in some of the following cases:

  • Application responsiveness: Applications that can accept new input and respond within a given time frame (be responsive) even if they did not finish processing previous inputs.
  • Multiuser applications and network communication: Applications that are supposed to accept inputs of multiple users simultaneously often communicate with users over the network. This means that they can heavily reduce the impact of locking by leveraging those parts of CPython where the GIL is released.
  • Work delegation and background processing: Applications where much of the heavy lifting is done by external applications or services and your code acts as a gateway to those resources.

Let's start with responsive applications, as those are the ones that tend to prefer multithreading over other concurrency models.

Application responsiveness

Let's say you ask your OS to copy a large file from one folder to another through its graphical user interface. The task will possibly be pushed into the background and the interface window will display a constantly refreshed progress status. This way, you get live feedback on the progress of the whole process. You will also be able to cancel the operation. You can also carry out other work like browsing the web or editing your documents while your OS is still copying the file. The graphical user interface of your system will stay responsive to your actions. This is less irritating than a raw cp or copy shell command that does not provide any feedback until the entirety of the work is finished.

A responsive interface also allows a user to work on several tasks at the same time. For instance, Gimp (a popular open-source image editing application) will let you play around with a picture while another one is being filtered, since the two tasks are independent.

When trying to achieve such responsive interfaces, a good approach is to try to push long-running tasks into the background, or at least try to provide constant feedback to the user. The easiest way to achieve that is to use threads. In such a scenario, threads are used to make sure that the user can still operate the interface, even if the application needs to process its tasks for a longer period of time.

This approach is often used together with event-driven programming where the main application thread pushes events to be processed by background worker threads (see Chapter 7, Event-Driven Programming). Web browsers are good examples of applications that often use this architectural pattern.

Do not confuse application responsiveness with Responsive Web Design (RDW). The latter is a popular design approach of web applications that allows you to display the same web application well on a variety of mediums (such as desktop browsers, mobiles, or tablets).

Multiuser applications

Serving multiple users simultaneously may be understood as a special case of application responsiveness. The key difference is that here the application has to satisfy the parallel inputs of many users and each one of them may have some expectations about how quickly the application should respond. Simply put, one user should not have to wait for other user inputs to be processed in order to be served.

Threading is a popular concurrency model for multiuser applications and is extremely common in web applications. For instance, the main thread of a web server may accept all incoming connections but dispatch the processing of every single request to a separate dedicated thread. This usually allows us to handle multiple connections and requests at the same time. The number of connections and requests the application will be able to handle at the same time is only constrained by the ability of the main thread to quickly accept connections and dispatch requests to new threads. A limitation of this approach is that applications using it can quickly consume many resources. Threads are not free: memory is shared but each thread will have at least its own stack allocated. If the number of threads is too large, the total memory consumption can quickly get out of hand.

Another model of threaded multiuser applications assumes that there is always a limited pool of threads acting as workers that are able to process incoming user inputs. The main thread is then only responsible for allocating and managing the pool of workers. Web applications often utilize this pattern too. A web server, for instance, can create a limited number of threads and each of those threads will be able to accept connections on its own and handle all requests incoming on that connection. This approach usually allows you to serve fewer users at the same time (compared to one thread per request) but gives more control over resource usage. Two very popular Python WSGI-compliant web servers—Gunicorn and uWSGI—allow serving HTTP requests with threaded workers in a way that generally follows this principle.

WSGI stands for Web Server Gateway Interface. It is a common Python standard (defined in PEP 3333, accessible at https://www.python.org/dev/peps/pep-3333/) for communication between web servers and applications that promotes the portability of web applications between web servers. Most modern Python web frameworks and web servers are based on the WSGI.

Using multithreading to enable concurrency in multiuser applications is generally less expensive in terms of resources than using multiprocessing. Separate Python processes will use more memory than threads since a new interpreter needs to be loaded for each one of them. On the other hand, having too many threads is expensive too. We know that the GIL isn't such a problem for I/O-intensive applications, but there will always be a time when you will need to execute Python code. Since you cannot parallelize all of the application parts with bare threads, you will never be able to utilize all of the resources on machines with multicore CPUs and a single Python process. This is why the optimal solution is sometimes a hybrid of multiprocessing and multithreading—multiple workers (processes) running with multiple threads. Fortunately, some WSGI-compliant web servers allow for such setup (for instance, Gunicorn with the gthread worker type).

Multiuser applications often utilize the delegation of work to threads as a means of ensuring proper responsiveness for multiple users. But work delegation alone can also be understood as a standalone use case for multithreading too.

Work delegation and background processing

If your application depends on many external resources, threads may really help in speeding it up. Let's consider the case of a function that indexes files in a folder and pushes the built indexes into a database. Depending on the type of file, the function executes a different processing routine. For example, one is specialized in PDFs and another one in OpenOffice files.

Instead of processing all files in a sequence, your function can set up a single thread for each converter and push jobs to be done to each one of them through a queue. The overall time taken by the function will be closer to the processing time of the slowest converter than to the total sum of the work.

The other common use case for threads is performing multiple network requests to an external service. For instance, if you want to fetch multiple results from a remote web API, it could take a lot of time to do that synchronously, especially if the remote server is located in a distant location.

If you wait for every previous response before making new requests, you will spend a lot of time just waiting for the external service to respond. Additional round-trip time delays will be added to every such request.

If you are communicating with some efficient service (the Google Maps API, for instance), it is highly probable that it can serve most of your requests concurrently without affecting the response times of individual requests. It is then reasonable to perform multiple queries in separate threads. Here, when doing an HTTP request, the application will most likely spend most of its time reading from the TCP socket. Delegating such work to threads allows for a great improvement of your application's performance.

An example of a multithreaded application

To see how Python threading works in practice, let's construct an example application that could benefit from the usage of threads. We will consider a simple problem that was already highlighted in the previous section as a common use case for multithreading: making parallel HTTP requests to some remote service.

Let's say we need to fetch information from some web service using multiple queries that cannot be batched into a single bulk HTTP request. As a realistic example, we will use the foreign exchange reference rates endpoint from a free API, available at https://www.vatcomply.com. The reasons for this choice are as follows:

  • This service is open and does not require any authentication keys.
  • The interface of the service is very simple and can be easily queried using the popular requests library.
  • This API uses a currency data format that is common across many similar APIs. If this service goes down (or stops being free), you will be able to easily switch the base URL of the API to the URL of a different service.

Free API services come and go. It is possible that after some time the URLs in this book won't work or the API will require a paid subscription. In such cases, running your own service may be a good option.

At https://github.com/exchangeratesapi/exchangeratesapi, you can find code for a currency exchange API service that uses the same data format as the API used in this chapter.

In our examples, we will try to obtain exchange rates for selected currencies using multiple currencies as reference points. We will then present the results as an exchange rate currency matrix, similar to the following:

1 USD =    1.0 USD,  0.887 EUR,    3.8 PLN,   8.53 NOK,  22.7 CZK
1 EUR =   1.13 USD,    1.0 EUR,   4.29 PLN,   9.62 NOK,  25.6 CZK
1 PLN =  0.263 USD,  0.233 EUR,    1.0 PLN,   2.24 NOK,  5.98 CZK
1 NOK =  0.117 USD,  0.104 EUR,  0.446 PLN,    1.0 NOK,  2.66 CZK
1 CZK =  0.044 USD,  0.039 EUR,  0.167 PLN,  0.375 NOK,   1.0 CZK

The API we've chosen offers several ways to query for multiple data points within single requests, but unfortunately it does not allow you to query for data using multiple base currencies at once. Obtaining the rate for a single base is as simple as doing the following:

>>> import requests
>>> response = requests.get("https://api.vatcomply.com/rates?base=USD")
>>> response.json()
{'base': 'USD', 'rates': {'BGN': 1.7343265053, 'NZD': 1.4824864769, 'ILS': 3.5777245721, 'RUB': 64.7361000266, 'CAD': 1.3287221779, 'USD': 1.0, 'PHP': 52.0368892436, 'CHF': 0.9993792675, 'AUD': 1.3993970027, 'JPY': 111.2973308504, 'TRY': 5.6802341048, 'HKD': 7.8425113062, 'MYR': 4.0986077858, 'HRK': 6.5923561231, 'CZK': 22.7170346723, 'IDR': 14132.9963642813, 'DKK': 6.6196683515, 'NOK': 8.5297508203, 'HUF': 285.09355325, 'GBP': 0.7655848187, 'MXN': 18.930477964, 'THB': 31.7495787887, 'ISK': 118.6485767491, 'ZAR': 14.0298838344, 'BRL': 3.8548372794, 'SGD': 1.3527533919, 'PLN': 3.8015429636, 'INR': 69.3340427419, 'KRW': 1139.4519819101, 'RON': 4.221867518, 'CNY': 6.7117141084, 'SEK': 9.2444799149, 'EUR': 0.8867606633}, 'date': '2019-04-09'}

In order to keep our examples concise, we will use the requests package to perform HTTP requests. It is not a part of the standard library but can be easily obtained from PyPI using pip.

You can read more about requests at https://requests.readthedocs.io/.

Since our goal is to show how a multithreaded solution of concurrent problems compares to a classical synchronous solution, we will start with an implementation that doesn't use threads at all. Here is the code of a program that loops over the list of base currencies, queries the foreign exchange rates API, and displays the results on standard output as a text-formatted table:

import time
import requests
SYMBOLS = ('USD', 'EUR', 'PLN', 'NOK', 'CZK')
BASES = ('USD', 'EUR', 'PLN', 'NOK', 'CZK')
def fetch_rates(base):
    response = requests.get(
        f"https://api.vatcomply.com/rates?base={base}"
    )
    response.raise_for_status()
    rates = response.json()["rates"]
    
    # note: same currency exchanges to itself 1:1
    rates[base] = 1.
    rates_line = ", ".join(
        [f"{rates[symbol]:7.03} {symbol}" for symbol in SYMBOLS]
    )
    print(f"1 {base} = {rates_line}")
def main():
    for base in BASES:
        fetch_rates(base)
if __name__ == "__main__":
    started = time.time()
    main()
    elapsed = time.time() - started
    print()
    print("time elapsed: {:.2f}s".format(elapsed))

The main() function iterates over a list of base currencies and calls the fetch_rates() function to obtain exchange rates for the base currencies. Inside fetch_rates(), we make a single HTTP request using the requests.get() function. The response.raise_for_status() method will raise an exception if the server returns a response with a status code denoting a server or client error. For now, we don't expect any exceptions and simply assume that after receiving the request, we can successfully read the response payload using the response.json() method. We will discuss how to properly handle exceptions raised within threads in the Dealing with errors in threads section.

We've added a few statements around the execution of the main() function that are intended to measure how much time it took to finish the job. Let's save that code in a file named synchronous.py and execute it to see how it works:

$ python3 synchronous.py

On my computer, it can take a couple of seconds to complete that task:

1 USD =     1.0 USD,   0.823 EUR,    3.73 PLN,     8.5 NOK,    21.5 CZK
1 EUR =    1.22 USD,     1.0 EUR,    4.54 PLN,    10.3 NOK,    26.2 CZK
1 PLN =   0.268 USD,    0.22 EUR,     1.0 PLN,    2.28 NOK,    5.76 CZK
1 NOK =   0.118 USD,  0.0968 EUR,   0.439 PLN,     1.0 NOK,    2.53 CZK
1 CZK =  0.0465 USD,  0.0382 EUR,   0.174 PLN,   0.395 NOK,     1.0 CZK
time elapsed: 4.08s

Every run of our script will always take a different amount of time. This is because the processing time mostly depends on a remote service that's accessible through a network connection. There are many non-deterministic factors affecting the final result. If we wanted to be really methodical, we would make longer tests, repeat them multiple times, and calculate an average from the measurements. But for the sake of simplicity, we won't do that. You will see later that this simplified approach is just enough for illustration purposes.

We have some baseline implementation. Now it is time to introduce threads. In the next section, we will try to introduce one thread per call of the fetch_rates() function.

Using one thread per item

Now, it is time for improvement. We don't do a lot of processing in Python, and long execution times are caused by communication with the external service. We send an HTTP request to the remote server, it calculates the answer, and then we wait until the response is transferred back.

There is a lot of I/O involved, so multithreading seems like a viable option. We can start all the requests at once in separate threads and then just wait until we receive data from all of them. If the service that we are communicating with is able to process our requests concurrently, we should definitely see a performance improvement.

So, let's start with the easiest approach. Python provides clean and easy-to-use abstraction over system threads with the threading module. The core of this standard library is the Thread class, which represents a single thread instance. Here is a modified version of the main() function that creates and starts a new thread for every base currency to process and then waits until all the threads finish:

from threading import Thread
def main():
    threads = []
    for base in BASES:
        thread = Thread(target=fetch_rates, args=[base])
        thread.start()
        threads.append(thread)
    while threads:
        threads.pop().join()

It is a quick and dirty solution that approaches the problem in a bit of a frivolous way. It has some serious issues that we will have to address later. But hey, it works. Let's save the modified script in the threads_one_per_item.py file and run it to see if there is some performance improvement:

$ python3 one_thread_per_item.py

On my computer, I see substantial improvement in total processing time:

1 EUR =    1.22 USD,     1.0 EUR,    4.54 PLN,    10.3 NOK,    26.2 CZK
1 NOK =   0.118 USD,  0.0968 EUR,   0.439 PLN,     1.0 NOK,    2.53 CZK
1 CZK =  0.0465 USD,  0.0382 EUR,   0.174 PLN,   0.395 NOK,     1.0 CZK
1 USD =     1.0 USD,   0.823 EUR,    3.73 PLN,     8.5 NOK,    21.5 CZK
1 PLN =   0.268 USD,    0.22 EUR,     1.0 PLN,    2.28 NOK,    5.76 CZK
time elapsed: 1.34s

Due to using print() inside of a thread, the output you will see may be slightly malformed. This is one of the multithreading problems that we will take care of later in this section.

Once we know that threads have a beneficial effect on our application, it is time to use them in a more logical way. First, we need to identify the following issues in the preceding code:

  • We start a new thread for every parameter. Thread initialization also takes some time, but this minor overhead is not the only problem. Threads also consume other resources, like memory or file descriptors. Our example input has a strictly defined number of items, but what if it did not have a limit? You definitely don't want to run an unbound number of threads that depend on the arbitrary size of data input.
  • The fetch_rates() function that's executed in threads calls the built-in print() function, and in practice it is very unlikely that you would want to do that outside of the main application thread. This is mainly due to the way the standard output is buffered in Python. You can experience malformed output when multiple calls to this function interweave between threads. Also, the print() function is considered slow. If used recklessly in multiple threads, it can lead to serialization that will waste all your benefits of multithreading.
  • Last but not least, by delegating every function call to a separate thread, we make it extremely hard to control the rate at which our input is processed. Yes, we want to do the job as fast as possible, but very often, external services enforce hard limits on the rate of requests from a single client that they can process. Sometimes, it is reasonable to design a program in a way that enables you to throttle the rate of processing, so your application won't be blacklisted by external APIs for abusing their usage limits.

In the next section, we will see how to use a thread pool to solve the problem of an unbounded number of threads.

Using a thread pool

The first issue we will try to solve is the unbounded number of threads that are run by our program. A good solution would be to build a pool of threaded workers with a strictly defined size that will handle all the parallel work and communicate with main thread through some thread-safe data structure. By using this thread pool approach, we will also make it easier to solve two other problems that we've mentioned in the previous section.

The general idea is to start a predefined number of threads that will consume the work items from a queue until it becomes empty. When there is no other work to do, the threads will quit, and we will be able to exit from the program. A good candidate for our communication data structure is the Queue class from the built-in queue module. It is a First-In First-Out (FIFO) queue implementation that is very similar to the deque collection from the collections module and was specifically designed to handle inter-thread communication. Here is a modified version of the main() function that starts only a limited number of worker threads with a new worker() function as a target and communicates with them using a thread-safe queue:

from queue import Queue
from threading import Thread
THREAD_POOL_SIZE = 4
def main():
    work_queue = Queue()
    for base in BASES:
        work_queue.put(base)
    threads = [
        Thread(target=worker, args=(work_queue,))
        for _ in range(THREAD_POOL_SIZE)
    ]
    for thread in threads:
        thread.start()
    work_queue.join()
    while threads:
        threads.pop().join()

Python has some built-in thread pooling utilities. We will cover them later in the Using multiprocessing.dummy as the multithreading interface section.

The main function initializes the Queue instance as the worker_queue variable and puts all the base currencies in the queue as items of work to be processed by worker threads. It then initializes the THREAD_POOL_SIZE number of threads with the worker() function as a thread target and work_queue as their input argument. It then waits until all items have been processed using work_queue.join() and then waits for all threads to finish by calling the join method of every Thread instance.

The processing of work items from the queue happens in the worker function. Here is its code:

from queue import Empty
def worker(work_queue):
    while not work_queue.empty():
        try:
            item = work_queue.get_nowait()
        except Empty:
            break
        else:
            fetch_rates(item)
            work_queue.task_done()

The worker() function runs in a while loop until work_queue.empty() returns True. In every iteration, it tries to obtain a new item in a non-blocking fashion using the work_queue.get_nowait() method. If the queue is already empty, it will raise an Empty exception, and our function will break the loop and finish. If there is an item to pick from the queue, the worker() function will pass it to fetch_rates(item) and mark the item as processed using work_queue.task_done(). When all items from the queue have been marked as done, the work_queue.join() function from the main thread will return.

The rest of the script, namely the fetch_rates() function and the code under the if __name__ == "__main__" clause, stays the same. The following is the full script that we can save in the thread_pool.py file:

import time
from queue import Queue, Empty
from threading import Thread
import requests
THREAD_POOL_SIZE = 4
SYMBOLS = ('USD', 'EUR', 'PLN', 'NOK', 'CZK')
BASES = ('USD', 'EUR', 'PLN', 'NOK', 'CZK')
def fetch_rates(base):
    response = requests.get(
        f"https://api.vatcomply.com/rates?base={base}"
    )
    response.raise_for_status()
    rates = response.json()["rates"]
    # note: same currency exchanges to itself 1:1
    rates[base] = 1.
    rates_line = ", ".join(
        [f"{rates[symbol]:7.03} {symbol}" for symbol in SYMBOLS]
    )
    print(f"1 {base} = {rates_line}")
def worker(work_queue):
    while not work_queue.empty():
        try:
            item = work_queue.get_nowait()
        except Empty:
            break
        else:
            fetch_rates(item)
            work_queue.task_done()
def main():
    work_queue = Queue()
    for base in BASES:
        work_queue.put(base)
    threads = [
        Thread(target=worker, args=(work_queue,))
        for _ in range(THREAD_POOL_SIZE)
    ]
    for thread in threads:
        thread.start()
    work_queue.join()
    while threads:
        threads.pop().join()
if __name__ == "__main__":
    started = time.time()
    main()
    elapsed = time.time() - started
    print()
    print("time elapsed: {:.2f}s".format(elapsed))

We can now execute the script to see if there is any performance difference between the previous attempt:

$ python thread_pool.py

On my computer, I can see the following output:

1 NOK =   0.118 USD,  0.0968 EUR,   0.439 PLN,     1.0 NOK,    2.53 CZK
1 PLN =   0.268 USD,    0.22 EUR,     1.0 PLN,    2.28 NOK,    5.76 CZK
1 USD =     1.0 USD,   0.823 EUR,    3.73 PLN,     8.5 NOK,    21.5 CZK
1 EUR =    1.22 USD,     1.0 EUR,    4.54 PLN,    10.3 NOK,    26.2 CZK
1 CZK =  0.0465 USD,  0.0382 EUR,   0.174 PLN,   0.395 NOK,     1.0 CZK
time elapsed: 1.90s

The overall runtime may be slower than when using one thread per argument, but at least now it is not possible to exhaust all the computing resources with an arbitrarily long input. Also, we can tweak the THREAD_POOL_SIZE parameter for a better resource/time balance.

In this attempt, we used an unmodified version of the fetch_rates() function that outputs the API result on the standard output directly from within the thread. In some cases, this may lead to malformed output when two threads attempt to print results at the same time. In the next section, we will try to improve it by introducing two-way queues.

Using two-way queues

The issue that we are now able to solve is the potentially problematic printing of the output in threads. It would be much better to leave such a responsibility to the main thread that started the worker threads. We can handle that by providing another queue that will be responsible for collecting results from our workers. Here is the complete code that puts everything together, with the main changes highlighted:

import time
from queue import Queue, Empty
from threading import Thread
import requests
SYMBOLS = ('USD', 'EUR', 'PLN', 'NOK', 'CZK')
BASES = ('USD', 'EUR', 'PLN', 'NOK', 'CZK')
THREAD_POOL_SIZE = 4
def fetch_rates(base):
    response = requests.get(
        f"https://api.vatcomply.com/rates?base={base}"
    )
    response.raise_for_status()
    rates = response.json()["rates"]
    # note: same currency exchanges to itself 1:1
    rates[base] = 1.
    return base, rates
def present_result(base, rates):
    rates_line = ", ".join([
        f"{rates[symbol]:7.03} {symbol}" 
        for symbol in SYMBOLS
    ])
    print(f"1 {base} = {rates_line}")
def worker(work_queue, results_queue):
    while not work_queue.empty():
        try:
            item = work_queue.get_nowait()
        except Empty:
            break
        else:
            results_queue.put(fetch_rates(item))
            work_queue.task_done()
def main():
    work_queue = Queue()
    results_queue = Queue()
    for base in BASES:
        work_queue.put(base)
    threads = [
        Thread(
            target=worker, 
            args=(work_queue, results_queue)
        ) for _ in range(THREAD_POOL_SIZE)
    ]
    for thread in threads:
        thread.start()
    work_queue.join()
    while threads:
        threads.pop().join()
    while not results_queue.empty():
        present_result(*results_queue.get())
if __name__ == "__main__":
    started = time.time()
    main()
    elapsed = time.time() - started
    print()
    print("time elapsed: {:.2f}s".format(elapsed)) 

The main difference is the introduction of the results_queue instance of the Queue class and the presents_results() function. The fetch_rates() function no longer prints its results to standard output. It instead returns processed API results straight to the worker() function. Worker threads pass those results unmodified through a new results_queue output queue.

Now only the main thread is responsible for printing the results on standard output. After all the work has been marked as done, the main() function consumes results from results_queue and passes them to the present_results() function.

This eliminates the risk of malformed inputs that we could encounter if the present_result() function would do more print(). We don't expect any performance improvement from this approach with small inputs, but in fact we also reduced the risk of thread serialization due to slow print() execution.

In all the previous examples, we've assumed that the API we use will always respond with a meaningful and valid answer. We've didn't cover any failure scenarios to keep things simple, but in real applications, it could be a problem. In the next section, we will see what happens when an exception is raised within a thread and how it affects communication over queues.

Dealing with errors in threads

The raise_for_status() method of the requests.Response object will raise an exception if the HTTP response has a status code indicating the error condition. We have used that method in all the previous iterations of the fetch_rates() function but we haven't handled any potential exceptions yet.

If the service we are calling with the requests.get() method responds with a status code indicating an error, the exception will be raised in a separate thread and will not crash the entire program. The worker thread will, of course, exit immediately. But the main thread will wait for all tasks stored on work_queue to finish (with the work_queue.join() call). Without further improvement, we may end up in a situation where some of the worker threads crashed and the program will never exit. To avoid this we should ensure that our worker threads gracefully handle possible exceptions and make sure that all items from the queue are processed.

Let's make some minor changes to our code in order to be prepared for any issues that may occur. In case there are exceptions in the worker thread, we may put an error instance on the results_queue queue so the main thread will be able to tell which of the tasks have failed to process. We can also mark the current task as done, the same as we would do if there was no error. That way, we make sure that the main thread won't lock indefinitely while waiting on the work_queue.join() method call.

The main thread might then inspect the results and re-raise any of the exceptions found on the results queue. Here are the improved versions of the worker() and main() functions that can deal with exceptions in a safer way (the changes are highlighted):

def worker(work_queue, results_queue):
    while not work_queue.empty():
        try:
            item = work_queue.get_nowait()
        except Empty:
            break
        
        try:
            result = fetch_rates(item)
        except Exception as err:
            results_queue.put(err)
        else:
            results_queue.put(result)
        finally:
            work_queue.task_done()
def main():
    work_queue = Queue()
    results_queue = Queue()
    for base in BASES:
        work_queue.put(base)
    threads = [
        Thread(target=worker, args=(work_queue, results_queue))
        for _ in range(THREAD_POOL_SIZE)
    ]
    for thread in threads:
        thread.start()
    work_queue.join()
    while threads:
        threads.pop().join()
    while not results_queue.empty():
        result = results_queue.get()
        if isinstance(result, Exception):
            raise result
        present_result(*result)

To see how error handling works in action we will try to simulate a convincing error scenario. Since we don't have full control over the API we use, we will randomly inject error responses to the fetch_rates() function. The following is the modified version of that function:

import random
def fetch_rates(base):
    response = requests.get(
        f"https://api.vatcomply.com/rates?base={base}"
    )
    if random.randint(0, 5) < 1:
        # simulate error by overriding status code
        response.status_code = 500
    response.raise_for_status()
    rates = response.json()["rates"]
    # note: same currency exchanges to itself 1:1
    rates[base] = 1.
    return base, rates

By modifying response.status_code to 500, we will simulate the situation of our API returning a response indicating a server error. This is a common status code for issues occurring on the server side. In such situations, details of the error are not always disclosed. This status code is just enough for the response.raise_for_status() method to raise an exception.

Let's save a modified version of the code in the error_handling.py file and run it to see how it handles exceptions:

$ python3 error_handling.py

Errors are injected randomly, so this may need to be executed a few times. After a couple of tries, you should see an output similar to the following:

1 PLN =   0.268 USD,    0.22 EUR,     1.0 PLN,    2.28 NOK,    5.76 CZK
Traceback (most recent call last):
  File ".../error_handling.py", line 92, in <module>
    main()
  File ".../error_handling.py", line 85, in main
    raise result
  File ".../error_handling.py", line 53, in worker
    result = fetch_rates(item)
  File ".../error_handling.py", line 30, in fetch_rates
    response.raise_for_status()
  File ".../.venv/lib/python3.9/site-packages/requests/models.py", line 943, in raise_for_status
    raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 500 Server Error: OK for url: https://api.vatcomply.com/rates?base=NOK

Our code did not succeed in obtaining all the items, but we at least got clear information about the error cause, which was in this case a 500 Server Error response status.

In the next section, we will make the last improvement to our multithreaded program. We will introduce a throttling mechanism to protect our program from rate limiting and avoid the accidental abuse of the free service we use.

Throttling

The last of the issues mentioned in the Using one thread per item section that we haven't tackled yet is potential rate limits that may be imposed by external service providers. In the case of the foreign exchange rates API, the service maintainer did not inform us about any rate limits or throttling mechanisms. But many services (even paid ones) often do impose rate limits.

Usually, when a service has rate limits implemented, it will start returning responses indicating errors after a certain number of requests are made, surpassing the allocated quota. We've already prepared for error responses in the previous section, but that is often not enough to properly handle rate limits. That's because many services often count requests made beyond the limit, and if you go beyond the limit consistently, you may never get back to the allocated quota.

When using multiple threads, it is very easy to exhaust any rate limit or simply—if the service does not throttle incoming requests—saturate the service to the level that it will not be able to respond to anyone. If done on purpose, this is known as a Denial-of-Service (DoS) attack.

In order to not go over the rate limits or cause accidental DoS, we need to limit the pace at which we make requests to the remote service. Limiting the pace of work is often called throttling. There are a few packages in PyPI that allow you to limit the rate of any kind of work that are really easy to use. But we won't use any external code here. Throttling is a good opportunity to introduce some locking primitives for threading, so we will try to build a throttling solution from scratch.

The algorithm we will use is sometimes called a token bucket and is very simple. It includes the following functionality:

  • There is a bucket with a predefined number of tokens
  • Each token corresponds to a single permission to process one item of work
  • Each time the worker asks for one or more tokens (permissions), we do the following:
    1. We check how much time has passed since the last time we refilled the bucket
    2. If the time difference allows for it, we refill the bucket with the number of tokens that correspond to the time difference
    3. If the number of stored tokens is bigger than or equal to the amount requested, we decrease the number of stored tokens and return that value
    4. If the number of stored tokens is less than requested, we return zero

The two important things are to always initialize the token bucket with zero tokens and to never allow it to overfill. This may be counter-intuitive, but if we don't follow these precautions, we can release the tokens in bursts that exceed the rate limit. Because, in our situation, the rate limit is expressed in requests per second, we don't need to deal with arbitrary amount of time. We assume that the base for our measurement is one second, so we will never store more tokens than the number of requests allowed for that amount of time. Here is an example implementation of the class that allows for throttling with the token bucket algorithm:

from threading import Lock 
 
class Throttle: 
    def __init__(self, rate): 
        self._consume_lock = Lock() 
        self.rate = rate 
        self.tokens = 0 
        self.last = None
 
    def consume(self, amount=1): 
        with self._consume_lock: 
            now = time.time() 
             
            # time measurement is initialized on first 
            # token request to avoid initial bursts 
            if self.last is None: 
                self.last = now 
 
            elapsed = now - self.last 
 
            # make sure that quant of passed time is big 
            # enough to add new tokens 
            if elapsed * self.rate > 1: 
                self.tokens += elapsed * self.rate
                self.last = now 
 
            # never over-fill the bucket 
            self.tokens = min(self.rate, self.tokens)
 
            # finally dispatch tokens if available 
            if self.tokens >= amount: 
                self.tokens -= amount
                return amount
            
            return 0

The usage of this class is very simple. We have to create only one instance of Throttle (for example, Throttle(10)) in the main thread and pass it to every worker thread as a positional argument:

def main():
    work_queue = Queue()
    results_queue = Queue()
    throttle = Throttle(10)
    for base in BASES:
        work_queue.put(base)
    threads = [
        Thread(
            target=worker, 
            args=(work_queue, results_queue, throttle)
        ) for _ in range(THREAD_POOL_SIZE)
    ]
    ...

This throttle instance will be shared across threads, but it is safe to use because we guarded the manipulation of its internal state with the instance of the Lock class from the threading module. We can now update the worker() function implementation to wait with every item until the throttle object releases a new token, as follows:

import time
def worker(work_queue, results_queue, throttle): 
    while True: 
        try: 
            item = work_queue.get_nowait() 
        except Empty: 
            break 
        
        while not throttle.consume():
            time.sleep(0.1) 
 
        try: 
            result = fetch_rates(item) 
        except Exception as err: 
            results_queue.put(err) 
        else: 
            results_queue.put(result) 
            finally: 
                work_queue.task_done()

The while not throttle.consume() block prevents us from processing work queue items if a throttle object does not release any tokens (zero evaluates to False). We've put a short sleep to add some pacing for the threads in the event of an empty bucket. There's probably a more elegant way to do that, but this simple technique does the job fairly well.

When throttle.consume() returns a non-zero value, we consider the token consumed. The thread can exit the while loop and proceed with processing the work queue item. When the processing is done, it will read another item from the work queue and again try to consume the token. This whole process will continue until the work queue is empty.

This was a very brief introduction to threads. We haven't covered every possible aspect of multithreaded applications, but we already know enough to take a look at other concurrency models and see how they compare to threads. The next concurrency model will be multiprocessing.

Multiprocessing

Let's be honest, multithreading is challenging. Dealing with threads in a sane and safe manner required a tremendous amount of code when compared to the synchronous approach. We had to set up a thread pool and communication queues, gracefully handle exceptions from threads, and also worry about thread safety when trying to provide a rate limiting capability. Dozens of lines of code are needed just to execute one function from some external library in parallel! And we rely on the promise from the external package creator that their library is thread-safe. Sounds like a high price for a solution that is practically applicable only for doing I/O-bound tasks.

An alternative approach that allows you to achieve parallelism is multiprocessing. Separate Python processes that do not constrain each other with the GIL allow for better resource utilization. This is especially important for applications running on multicore processors that are performing really CPU-intensive tasks. Right now, this is the only built-in concurrent solution available for Python developers (using CPython interpreter) that allows you to take benefit from multiple processor cores in every situation.

The other advantage of using multiple processes over threads is the fact that they do not share a memory context. Thus, it is harder to corrupt data and introduce deadlocks or race conditions in your application. Not sharing the memory context means that you need some additional effort to pass the data between separate processes, but fortunately there are many good ways to implement reliable inter-process communication. In fact, Python provides some primitives that make communication between processes almost as easy as it is between threads.

The most basic way to start new processes in any programming language is usually by forking the program at some point. On POSIX and POSIX-like systems (like UNIX, macOS, and Linux), a fork is a system call that will create a new child process. In Python it is exposed through the os.fork() function. The two processes continue the program in their own right after the forking. Here is an example script that forks itself exactly once:

import os 
pid_list = [] 
def main(): 
    pid_list.append(os.getpid()) 
    child_pid = os.fork() 
 
    if child_pid == 0: 
        pid_list.append(os.getpid()) 
        print() 
        print("CHLD: hey, I am the child process") 
        print("CHLD: all the pids I know %s" % pid_list) 
 
    else: 
        pid_list.append(os.getpid()) 
        print() 
        print("PRNT: hey, I am the parent process") 
        print("PRNT: the child pid is %d" % child_pid) 
        print("PRNT: all the pids I know %s" % pid_list) 
 
 
if __name__ == "__main__": 
    main()

The os.fork() spawns a new process. Both processes will have the same memory state till the moment of the fork() call, but after that call their memories diverge, hence the fork name. os.fork() returns an integer value. If it is 0, we know that the current process is a child process. The parent process will receive the Process ID (PID) number of its child process.

Let's save the script in the forks.py file and run it in a shell session:

$ python3 forks.py

On my computer, I've got the following output:

PRNT: hey, I am the parent process
PRNT: the child pid is 9304
PRNT: all the pids I know [9303, 9303]
CHLD: hey, I am the child process
CHLD: all the pids I know [9303, 9304]

Notice how both processes have exactly the same initial state of their data before the os.fork() call. They both have the same PID number (process identifier) as a first value of the pid_list collection.

Later, both states diverge. We can see that the child process added the 9304 value while the parent duplicated its 9303 PID. This is because the memory contexts of these two processes are not shared. They have the same initial conditions but cannot affect each other after the os.fork() call.

After the fork, each process gets its own address space. To communicate, processes need to work with system-wide resources or use low-level tools like signals.

Unfortunately, os.fork is not available under Windows, where a new interpreter needs to be spawned in order to mimic the fork feature. Therefore, the multiprocessing implementation depends on the platform. The os module also exposes functions that allow you to spawn new processes under Windows. Python provides the great multiprocessing module, which creates a high-level interface for multiprocessing.

The great advantage of the multiprocessing module is that it provides some of the abstractions that we had to code from scratch when we discussed multithreading. It allows you to limit the amount of boilerplate code, so it improves application maintainability and reduces complexity. Surprisingly, despite its name, the multiprocessing module exposes a similar interface for threads, so you will probably want to use the same interface for both approaches.

Let's take a closer look at the built-in multiprocessing module in the next section.

The built-in multiprocessing module

The multiprocessing module provides a portable way to work with processes as if they were threads. This module contains a Process class that is very similar to the Thread class, and can be used on any platform, as follows:

from multiprocessing import Process
import os
def work(identifier):
    print(
        f'Hey, I am the process '
        f'{identifier}, pid: {os.getpid()}'
    )
def main():
    processes = [
        Process(target=work, args=(number,))
        for number in range(5)
    ]
    for process in processes:
        process.start()
    while processes:
        processes.pop().join()
if __name__ == "__main__":
    main()

The Process class has start() and join() methods that are similar to the methods in the Thread class. The start() method spawns a new process and join() waits until the child process exits.

Let's save that script in a file called basic_multiprocessing.py and execute it to see how it works in action:

$ python3 basic_multiprocessing.py

On your own computer, you will be able to see output similar to the following:

Hey, I am the process 3, pid: 9632
Hey, I am the process 1, pid: 9630
Hey, I am the process 2, pid: 9631
Hey, I am the process 0, pid: 9629
Hey, I am the process 4, pid: 9633

When processes are created, the memory is forked (on POSIX and POSIX-like systems). Besides the memory state that is copied, the Process class also provides an extra args argument in its constructor so that data can be passed along.

Communication between processes requires some additional work because their local memory is not shared by default. To ease this, the multiprocessing module provides the following few ways of communicating between processes:

  • Using the multiprocessing.Queue class, which is a functional equivalent of queue.Queue, which was used earlier for communication between threads.
  • Using multiprocessing.Pipe, which is a socket-like two-way communication channel.
  • Using the multiprocessing.sharedctypes module, which allows you to create arbitrary C types (from the ctypes module) in a dedicated pool of memory that is shared between processes.

The multiprocessing.Queue and queue.Queue classes have the same interface. The only difference is that the first is designed for usage in multiprocess environments, rather than with multiple threads, so it uses different internal transports and locking primitives. We've already seen how to use Queue with multithreading in the Multithreading section, so we won't do the same for multiprocessing. The usage stays exactly the same, so such an example would not bring anything new.

A more interesting communication pattern is provided by the Pipe class. It is a duplex (two-way) communication channel that is very similar in concept to UNIX pipes. The interface of Pipe is very similar to a simple socket from the built-in socket module. The difference between raw system pipes and sockets is that it automatically applies object serialization through the pickle module. From a developer's perspective, it looks like sending ordinary Python objects. With plain system pipes or sockets, you need to apply your own serialization manually in order to reconstruct sent objects from byte streams.

The pickle module can easily serialize and deserialize Python objects to and from byte streams. It handles various types of objects including instances of user-defined classes. You can learn more about the pickle module and which objects are picklable at https://docs.python.org/3/library/pickle.html.

This allows for much easier communication between processes because you can send almost any basic Python type. Consider the following worker() class, which will read an object from the Pipe object and output its representation on standard output:

def worker(connection):
    while True:
        instance = connection.recv()
        if instance:
            print(f"CHLD: recv: {instance}")
        if instance is None:
            break

Later on, we can use the Pipe in our main() function to send various objects (including custom classes) to a child process:

from multiprocessing import Process, Pipe 
 
class CustomClass: 
    pass 
 
def main():
    parent_conn, child_conn = Pipe()
    child = Process(target=worker, args=(child_conn,))
    for item in (
        42,
        'some string',
        {'one': 1},
        CustomClass(),
        None,
    ):
        print(
            "PRNT: send: {}".format(item)
        )
        parent_conn.send(item)
    child.start()
    child.join() 
if __name__ == "__main__": 
    main()

When looking at the following example output of the preceding script, you will see that you can easily pass custom class instances and that they have different addresses, depending on the process:

PRNT: send: 42
PRNT: send: some string
PRNT: send: {'one': 1}
PRNT: send: <__main__.CustomClass object at 0x101cb5b00>
PRNT: send: None
CHLD: recv: 42
CHLD: recv: some string
CHLD: recv: {'one': 1}
CHLD: recv: <__main__.CustomClass object at 0x101cba400>

The other way to share a state between processes is to use raw types in a shared memory pool with classes provided in multiprocessing.sharedctypes. The most basic ones are Value and Array. Here is some example code from the official documentation of the multiprocessing module:

from multiprocessing import Process, Value, Array 
def f(n, a): 
    n.value = 3.1415927 
    for i in range(len(a)): 
        a[i] = -a[i] 
 
 
if __name__ == '__main__': 
    num = Value('d', 0.0) 
    arr = Array('i', range(10)) 
 
    p = Process(target=f, args=(num, arr)) 
    p.start() 
    p.join() 
 
    print(num.value) 
    print(arr[:])

And this example will print the following output:

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

When working with multiprocessing.sharedctypes, you need to remember that you are dealing with shared memory, so to avoid the risk of race conditions, you still need to use locking primitives. multiprocessing provides some of the classes similar to those available in the threading module, such as Lock, RLock, and Semaphore. The downside of classes from sharedctypes is that they allow you only to share the basic C types from the ctypes module. If you need to pass more complex structures or class instances, you need to use Queue, Pipe, or other inter-process communication channels instead. In most cases, it is reasonable to avoid types from sharedctypes because they increase code complexity and bring all the dangers of multithreading.

We've already mentioned that the multiprocessing module allows you to reduce the amount of boilerplate thanks to some extra functionalities. One such functionality is built-in process pools. We will take a look at how to use them in the next section.

Using process pools

Using multiple processes instead of threads adds some overhead. Mostly, it increases the memory footprint because each process has its own and independent memory context. This means allowing unbound numbers of child processes may be more of an issue than allowing an unbounded number of threads in multithreaded applications.

If the OS supports the fork() system call with copy-on-write (COW) semantics, the memory overhead of starting new subprocesses will be greatly reduced. COW allows an OS to deduplicate the same memory pages and copy them only if one of the processes attempts to modify them. For instance, Linux provides the fork() system call with COW semantics but Windows does not. Also, COW benefits may be diminished in long-running processes.

The best pattern to control resource usage in applications that rely on multiprocessing is to build a process pool in a similar way to what we described for threads in the Using a thread pool section.

And the best thing about the multiprocessing module is that it provides a ready-to-use Pool class that handles all the complexity of managing multiple process workers for you. This pool implementation greatly reduces the amount of required boilerplate and the number of issues related to two-way communication. You also don't have to use the join() method manually, because Pool can be used as a context manager (using the with statement). Here is one of our previous threading examples, rewritten to use the Pool class from the multiprocessing module:

import time
from multiprocessing import Pool
import requests
SYMBOLS = ('USD', 'EUR', 'PLN', 'NOK', 'CZK')
BASES = ('USD', 'EUR', 'PLN', 'NOK', 'CZK')
POOL_SIZE = 4
def fetch_rates(base):
    response = requests.get(
        f"https://api.vatcomply.com/rates?base={base}"
    )
    response.raise_for_status()
    rates = response.json()["rates"]
    # note: same currency exchanges to itself 1:1
    rates[base] = 1.
    return base, rates
def present_result(base, rates):
    rates_line = ", ".join(
        [f"{rates[symbol]:7.03} {symbol}" for symbol in SYMBOLS]
    )
    print(f"1 {base} = {rates_line}")
def main():
    with Pool(POOL_SIZE) as pool:
        results = pool.map(fetch_rates, BASES)
    for result in results:
        present_result(*result)
if __name__ == "__main__":
    started = time.time()
    main()
    elapsed = time.time() - started
    print()
    print("time elapsed: {:.2f}s".format(elapsed))

As you can see, dealing with the worker pool is now simpler as we don't have to maintain our own work queues and start()/join() methods. The code would now be easier to maintain and debug in the case of issues. Actually, the only part of the code that explicitly deals with multiprocessing is the main() function:

def main():
    with Pool(POOL_SIZE) as pool:
        results = pool.map(fetch_rates, BASES)
    for result in results:
        present_result(*result)

We no longer have to deal with explicit queues for passing results and we don't have to wonder what happens when one of the subprocesses raises an exception. This is a great improvement on the situation where we had to build the worker pool from scratch. Now, we don't even need to care about communication channels because they are created implicitly inside the Pool class implementation.

This doesn't mean that multithreading always needs to be troublesome. Let's take a look at how to use multiprocessing.dummy as a multithreading interface in the next section.

Using multiprocessing.dummy as the multithreading interface

The high-level abstractions from the multiprocessing module, such as the Pool class, provide great advantages over the simple tools provided in the threading module. But this does not mean that multiprocessing is always better than multithreading. There are a lot of use cases where threads may be a better solution than processes. This is especially true for situations where low latency and/or high resource efficiency are required.

Still, it does not mean that you need to sacrifice all the useful abstractions from the multiprocessing module whenever you want to use threads instead of processes. There is the multiprocessing.dummy module, which replicates the multiprocessing API but uses multiple threads instead of forking/spawning new processes.

This allows you to reduce the amount of boilerplate in your code and also have a more pluggable code structure. For instance, let's take yet another look at our main() function from the previous section. We could give the user control over which processing backend to use (processes or threads). We could do that simply by replacing the Pool object constructor class, as follows:

from multiprocessing import Pool as ProcessPool 
from multiprocessing.dummy import Pool as ThreadPool 
 
 
def main(use_threads=False): 
    if use_threads: 
        pool_cls = ThreadPool 
    else: 
        pool_cls = ProcessPool 
 
    with pool_cls(POOL_SIZE) as pool: 
        results = pool.map(fetch_rates, BASES) 
 
    for result in results: 
        present_result(*result)

The dummy threading pool can also be imported from the multiprocessing.pool module as the ThreadPool class. It will have the same implementation; the actual import path is just a matter of personal preference.

This aspect of the multiprocessing module shows that multiprocessing and multithreading have a lot in common. They both rely on the OS to facilitate concurrency. They also can be operated in a similar fashion and often utilize similar abstractions to ensure communication or memory safety.

A completely different approach to concurrency is asynchronous programming, which does not rely on any OS capabilities to ensure the concurrent processing of information. Let's take a look at this model of concurrency in the next section.

Asynchronous programming

Asynchronous programming has gained a lot of traction in the last few years. In Python 3.5, we finally got some syntax features that solidified the concepts of asynchronous execution. But this does not mean that asynchronous programming wasn't possible before Python 3.5. A lot of libraries and frameworks were provided a lot earlier, and most of them have origins in the old versions of Python 2. There is even a whole alternate implementation of Python called Stackless Python that concentrates on this single programming approach.

The easiest way to think about asynchronous programming in Python is to imagine something similar to threads, but without system scheduling involved. This means that an asynchronous program can concurrently process information, but the execution context is switched internally and not by the system scheduler.

But, of course, we don't use threads to concurrently handle the work in an asynchronous program. Many asynchronous programming solutions use different kinds of concepts and, depending on the implementation, they are named differently. The following are some example names that are used to describe such concurrent program entities:

  • Green threads or greenlets (greenlet, gevent, or eventlet projects)
  • Coroutines (Python 3.5 native asynchronous programming)
  • Tasklets (Stackless Python)

The name green threads comes from the original threads library for the Java language implemented by The Green Team at the Sun Microsystems company. Green threads were introduced in Java 1.1 and abandoned in Java 1.3

These are mainly the same concepts but often implemented in slightly different ways.

For obvious reasons, in this section, we will concentrate only on coroutines that are natively supported by Python, starting from version 3.5.

Cooperative multitasking and asynchronous I/O

Cooperative multitasking is at the core of asynchronous programming. In this style of computer multitasking, it's not the responsibility of the OS to initiate a context switch (to another process or thread). Instead, every process voluntarily releases the control when it is idle to enable the simultaneous execution of multiple programs. This is why it is called cooperative multitasking. All processes need to cooperate in order to multitask smoothly.

This model of multitasking was sometimes employed in the OS, but now it is hardly found as a system-level solution. This is because there is a risk that one poorly designed service might easily break the whole system's stability. Thread and process scheduling with context switches managed directly by the OS is now the dominant approach for concurrency at the OS level. But cooperative multitasking is still a great concurrency tool at the application level.

When doing cooperative multitasking at the application level, we do not deal with threads or processes that need to release control because all the execution is contained within a single process and thread. Instead, we have multiple tasks (coroutines, tasklets, or green threads) that release the control to the single function that handles the coordination of tasks. This function is usually some kind of event loop.

To avoid confusion later (due to Python terminology), from now on, we will refer to such concurrent tasks as coroutines.

The most important problem in cooperative multitasking is when to release the control. In most asynchronous applications, the control is released to the scheduler or event loop on I/O operations. It doesn't matter if the program reads data from the filesystem or communicates through a socket, as such I/O operations always result in some waiting time when the process becomes idle. The waiting time depends on the external resource, so it is a good opportunity to release the control so that other coroutines can do their work until they too would need to wait.

This makes such an approach somewhat similar in behavior to how multithreading is implemented in Python. We know that the GIL serializes Python threads, but it is also released on every I/O operation. The main difference is that threads in Python are implemented as system-level threads so that the OS can preempt the currently running thread and give control to the other one at any point in time. In asynchronous programming, tasks are never preempted by the main event loop and must instead return control explicitly. That's why this style of multitasking is also called non-preemptive multitasking. This reduces time lost on context switching and plays better with CPython's GIL implementation.

Of course, every Python application runs on an OS where there are other processes competing for resources. This means that the OS always has the right to preempt the whole process and give control to another process. But when our asynchronous application is running back, it continues from the same place where it was paused when the system scheduler stepped in. This is why coroutines are still considered non-preemptive.

In the next section, we will take a look at the async and await keywords, which are the backbone of cooperative multitasking in Python.

Python async and await keywords

The async and await keywords are the main building blocks in Python asynchronous programming.

The async keyword, when used before the def statement, defines a new coroutine. The execution of the coroutine function may be suspended and resumed in strictly defined circumstances. Its syntax and behavior are very similar to generators. In fact, generators need to be used in the older versions of Python whenever you want to implement coroutines. Here is an example of a function declaration that uses the async keyword:

async def async_hello(): 
    print("hello, world!") 

Functions defined with the async keyword are special. When called, they do not execute the code inside, but instead return a coroutine object. Consider the following example from an interactive Python session:

>>> async def async_hello():
...     print("hello, world!")
... 
>>> async_hello()
<coroutine object async_hello at 0x1014129e8>

The coroutine object does not do anything until its execution is scheduled in the event loop. The asyncio module is available in order to provide the basic event loop implementation, as well as a lot of other asynchronous utilities. The following example presents an attempt to manually schedule a coroutine execution in an interactive Python session:

>>> import asyncio
>>> async def async_hello():
...     print("hello, world!")
... 
>>> loop = asyncio.get_event_loop()
>>> loop.run_until_complete(async_hello())
hello, world!
>>> loop.close()

Obviously, since we have created only one simple coroutine, there is no concurrency involved in our program. In order to see something that is actually concurrent, we need to create more tasks that will be executed by the event loop.

New tasks can be added to the loop by calling the loop.create_task() method or by providing an "awaitable" object to the asyncio.wait() function. If you have multiple tasks or coroutines to wait for, you can use asyncio.gather() to aggregate them into a single object. We will use the latter approach and try to asynchronously print a sequence of numbers that's been generated with the range() function, as follows:

import asyncio 
import random
 
async def print_number(number):
    await asyncio.sleep(random.random())
    print(number) 
 
 
if __name__ == "__main__": 
    loop = asyncio.get_event_loop() 
 
    loop.run_until_complete( 
        asyncio.gather(*[ 
            print_number(number) 
            for number in range(10) 
        ]) 
    ) 
    loop.close()

Let's save our script in an async_print.py file and see how it works:

$ python async_print.py

The output you will see may look as follows:

0
7
8
3
9
4
1
5
2
6

The asyncio.gather() function accepts multiple coroutine objects and returns immediately. It accepts a variable number of positional arguments. That's why we used the argument unpacking syntax (the * operator) to unpack the list of coroutines as arguments. As the name suggests, asyncio.gather() is used to gather multiple coroutines to execute them concurrently. The result is an object that represents a future result (a so-called future) of running all of the provided coroutines. The loop.run_until_complete() method runs the event loop until the given future is completed.

We used asyncio.sleep(random.random()) to emphasize the asynchronous operation of coroutines. Thanks to this, coroutines can interweave with each other.

We couldn't achieve the same result (that is, the interweaving of coroutines) with an ordinary time.sleep() function. Coroutines can start to interweave when they release control of execution. This is done through the await keyword. It suspends the execution of the coroutine that is waiting for the results of another coroutine or future.

Whenever a function awaits, it releases the control over execution to the event loop. To better understand how this works, we need to review a more complex example of code.

Let's say we want to create two coroutines that will both perform the same simple task in a loop:

  • Wait a random number of seconds
  • Print some text provided as an argument and the amount of time spent in sleep

Let's start with the following simple implementation that does not use the await keyword:

import time
import random
async def waiter(name): 
    for _ in range(4): 
        time_to_sleep = random.randint(1, 3) / 4 
        time.sleep(time_to_sleep) 
        print(f"{name} waited { time_to_sleep } seconds") 

We can schedule the execution of multiple waiter() coroutines using asyncio.gather() the same way as we did in the async_print.py script:

import asyncio
if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(
        asyncio.gather(waiter("first"), waiter("second"))
    )
    loop.close()

Let's save the code in the waiters.py file and see how those two waiter() coroutines execute in the event loop:

$ time python3 waiters.py

Note that we've used the time utility to measure total execution time. The preceding execution can give the following output:

$ time python waiters.py
first waited 0.25 seconds
first waited 0.75 seconds
first waited 0.5 seconds
first waited 0.25 seconds
second waited 0.75 seconds
second waited 0.5 seconds
second waited 0.5 seconds
second waited 0.75 seconds
real    0m4.337s
user    0m0.050s
sys     0m0.014s

As we can see, both the coroutines completed their execution, but not in an asynchronous manner. The reason is that they both use the time.sleep() function, which is blocking but not releasing the control to the event loop. This would work better in a multithreaded setup, but we don't want to use threads now. So, how can we fix this?

The answer is to use asyncio.sleep(), which is the asynchronous version of time.sleep(), and await its result using the await keyword. Let's see the following improved version of the waiter() coroutine, which uses the await asyncio.sleep() statement:

async def waiter(name):
    for _ in range(4):
        time_to_sleep = random.randint(1, 3) / 4
        await asyncio.sleep(time_to_sleep)
        print(f"{name} waited {time_to_sleep} seconds")

If we save a modified version of this script in the waiters_await.py file and execute it in the shell, we will hopefully see how the outputs of the two functions interweave with each other:

$ time python waiters_await.py

The output you will see should look something like the following:

first waited 0.5 seconds
second waited 0.75 seconds
second waited 0.25 seconds
first waited 0.75 seconds
second waited 0.75 seconds
first waited 0.75 seconds
second waited 0.75 seconds
first waited 0.5 seconds
real    0m2.589s
user    0m0.053s
sys     0m0.016s

The additional advantage of this simple improvement is that the code ran faster. The overall execution time was less than the sum of all sleeping times because coroutines were cooperatively releasing the control.

Let's take a look at a more practical example of asynchronous programming in the next section.

A practical example of asynchronous programming

As we have already mentioned multiple times in this chapter, asynchronous programming is a great tool for handling I/O-bound operations. So, it's time to build something more practical than a simple printing of sequences or asynchronous waiting.

For the sake of consistency, we will try to handle the same problem that we solved previously with the help of multithreading and multiprocessing. So, we will try to asynchronously fetch some information about current currency exchange rates from an external resource through a network connection. It would be great if we could use the same requests library as in the previous sections. Unfortunately, we can't do so. Or to be more precise, we can't do so effectively.

Unfortunately, the requests library does not support asynchronous I/O with the async and await keywords. There are some other projects that aim to provide some concurrency to the requests project, but they either rely on Gevent (like grequests, available at https://github.com/kennethreitz/grequests) or thread/process pool execution (like requests-futures, available at https://github.com/ross/requests-futures). Neither of these solves our problem.

Knowing the limitation of the library that was so easy to use in our previous examples, we need to build something that will fill the gap. The foreign exchange rates API is really simple to use, so we just need to use a natively asynchronous HTTP library for the job. The standard library of Python in version 3.9 still lacks any library that would make asynchronous HTTP requests as simple as calling urllib.urlopen(). We definitely don't want to build the whole protocol support from scratch, so we will use a little help from the aiohttp package, which is available on PyPI. It's a really promising library that adds both client and server implementations for asynchronous HTTP. Here is a small module built on top of aiohttp that creates a single get_rates() helper function that makes requests to the foreign exchange rates API service:

import aiohttp
async def get_rates(session: aiohttp.ClientSession, base: str):
    async with session.get(
        f"https://api.vatcomply.com/rates?base={base}"
    ) as response:
        rates = (await response.json())['rates']
        rates[base] = 1.
        return base, rates

We will save that module in the asyncrates.py file so later we will be able to import it as the asyncrates module.

Now, we are ready to rewrite the example used when we discussed multithreading and multiprocessing. Previously, we split the whole operation into the following two separate steps:

  • Perform all requests to an external service in parallel using the asyncrates.get_rates() function
  • Display all the results in a loop using the present_result() function

The core of our program will be a simple main() function that gathers results from multiple get_rates() coroutines and passes them to the present_result() function:

async def main():
    async with aiohttp.ClientSession() as session:
        for result in await asyncio.gather(*[
            get_rates(session, base)
            for base in BASES
        ]):
            present_result(*result)

And the full code, together with imports and event loop initialization, will be as follows:

import asyncio
import time
import aiohttp
from asyncrates import get_rates
SYMBOLS = ('USD', 'EUR', 'PLN', 'NOK', 'CZK')
BASES = ('USD', 'EUR', 'PLN', 'NOK', 'CZK')
def present_result(base, rates):
    rates_line = ", ".join(
        [f"{rates[symbol]:7.03} {symbol}" for symbol in SYMBOLS]
    )
    print(f"1 {base} = {rates_line}")
async def main():
    async with aiohttp.ClientSession() as session:
        for result in await asyncio.gather(*[
            get_rates(session, base)
            for base in BASES
        ]):
            present_result(*result)
if __name__ == "__main__":
    started = time.time()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    elapsed = time.time() - started
    print()
    print("time elapsed: {:.2f}s".format(elapsed))

The output of running this program will be similar to the output of versions that relied on multithreading and multiprocessing:

$ python async_aiohttp.py
1 USD =     1.0 USD,   0.835 EUR,    3.81 PLN,    8.39 NOK,    21.7 CZK
1 EUR =     1.2 USD,     1.0 EUR,    4.56 PLN,    10.0 NOK,    25.9 CZK
1 PLN =   0.263 USD,    0.22 EUR,     1.0 PLN,     2.2 NOK,    5.69 CZK
1 NOK =   0.119 USD,  0.0996 EUR,   0.454 PLN,     1.0 NOK,    2.58 CZK
1 CZK =  0.0461 USD,  0.0385 EUR,   0.176 PLN,   0.387 NOK,     1.0 CZK
time elapsed: 0.33s

The advantage of using asyncio over multithreading and multiprocessing is that we didn't have to deal with process pools and memory safety to achieve concurrent network communication. The downside is that we couldn't use a popular synchronous communication library like the requests package. We used aiohttp instead, and that's fairly easy for a simple API. But sometimes, you need a specialized client library that isn't asynchronous and cannot be easily ported. We will cover such a situation in the next section.

Integrating non-asynchronous code with async using futures

Asynchronous programming is great, especially for backend developers interested in building scalable applications. In practice, it is one of the most important tools for building highly concurrent servers.

But the reality is painful. A lot of popular packages that deal with I/O-bound problems are not meant to be used with asynchronous code. The main reasons for that are as follows:

  • The low adoption of advanced Python 3 features (especially asynchronous programming)
  • The low understanding of various concurrency concepts among Python beginners

This means that often, the migration of existing synchronous multithreaded applications and packages is either impossible (due to architectural constraints) or too expensive. A lot of projects could benefit greatly from incorporating the asynchronous style of multitasking, but only a few of them will eventually do that.

This means that right now, you will experience a lot of difficulties when trying to build asynchronous applications from scratch. In most cases, this will be something similar to the problem of the requests library mentioned in the A practical example of asynchronous programming section—incompatible interfaces and the synchronous blocking of I/O operations.

Of course, you can sometimes resign from await when you experience such incompatibility and just fetch the required resources synchronously. But this will block every other coroutine from executing its code while you wait for the results. It technically works but also ruins all the gains of asynchronous programming. So, in the end, joining asynchronous I/O with synchronous I/O is not an option. It is kind of an all-or-nothing game.

The other problem is long-running CPU-bound operations. When you are performing an I/O operation, it is not a problem to release control from a coroutine. When writing/reading from a socket, you will eventually wait, so using await is the best you can do. But what should you do when you need to actually compute something, and you know it will take a while? You can, of course, slice the problem into parts and release control with asyncio.wait(0) every time you move the work forward a bit. But you will shortly find that this is not a good pattern. Such a thing will make the code a mess, and also does not guarantee good results. Time slicing should be the responsibility of the interpreter or OS.

So, what should you do if you have some code that makes long synchronous I/O operations that you can't or are unwilling to rewrite? Or what should you do when you have to make some heavy CPU-bound operations in an application designed mostly with asynchronous I/O in mind? Well... you need to use a workaround. And by a workaround, I mean multithreading or multiprocessing.

This may not sound obvious, but sometimes the best solution may be the one that we tried to escape from. Parallel processing of CPU-intensive tasks in Python is always better with multiprocessing. And multithreading may deal with I/O operations equally as well (quickly and without a lot of resource overhead) as async and await, if you set it up properly and handle it with care.

So, when something simply does not fit your asynchronous application, use a piece of code that will defer it to a separate thread or process. You can pretend that this was a coroutine and release control to the event loop using await. You will eventually process results when they are ready. Fortunately for us, the Python standard library provides the concurrent.futures module, which is also integrated with the asyncio module. These two modules together allow you to schedule blocking functions to execute in threads or additional processes as if they were asynchronous non-blocking coroutines.

Let's take a closer look at executors and futures in the next section.

Executors and futures

Before we see how to inject threads or processes into an asynchronous event loop, we will take a closer look at the concurrent.futures module, which will later be the main ingredient of our so-called workaround. The most important classes in the concurrent.futures module are Executor and Future.

Executor represents a pool of resources that may process work items in parallel. This may seem very similar in purpose to classes from the multiprocessing module—Pool and dummy.Pool—but it has a completely different interface and semantics. The Executor class is a base class not intended for instantiation and has the following two concrete implementations:

  • ThreadPoolExecutor: This is the one that represents a pool of threads
  • ProcessPoolExecutor: This is the one that represents a pool of processes

Every executor provides the following three methods:

  • submit(func, *args, **kwargs): This schedules the func function for execution in a pool of resources and returns the Future object representing the execution of a callable
  • map(func, *iterables, timeout=None, chunksize=1): This executes the func function over an iterable in a similar way to the multiprocessing.Pool.map() method
  • shutdown(wait=True): This shuts down the executor and frees all of its resources

The most interesting method is submit() because of the Future object it returns. It represents the asynchronous execution of the callable and only indirectly represents its result. In order to obtain the actual return value of the submitted callable, you need to call the Future.result() method. And if the callable has already finished, the result() method will not block and will just return the function output. If it is not true, it will block until the result is ready. Treat it like a promise of a result (actually, it is the same concept as a promise in JavaScript). You don't need to unpack it immediately after receiving it (with the result() method), but if you try to do that, it is guaranteed to eventually return something.

Let's consider the following interaction with ThreadPoolExecutor in an interactive Python session:

>>> def loudly_return():
...     print("processing")
...     return 42
...     
>>> from concurrent.futures import ThreadPoolExecutor
>>> with ThreadPoolExecutor(1) as executor:
...     future = executor.submit(loudly_return)
...     
processing
>>> future
<Future at 0x33cbf98 state=finished returned int>
>>> future.result()
42  

As you can see, loudly_return() immediately printed the processing string after it was submitted to the executor. This means that execution started even before we decided to unpack its value using the future.result() method.

In the next section, we'll see how to use executors in an event loop.

Using executors in an event loop

The Future class instances returned by the Executor.submit() method are conceptually very close to the coroutines used in asynchronous programming. This is why we can use executors to make a hybrid between cooperative multitasking and multiprocessing or multithreading.

The core of this workaround is the run_in_executor(executor, func, *args) method of the event loop class. It allows you to schedule the execution of the func function in the process or thread pool represented by the executor argument. The most important thing about that method is that it returns a new awaitable (an object that can be awaited with the await statement). So, thanks to this, you can execute a blocking function that is not a coroutine exactly as if it was a coroutine. And most importantly, it will not block the event loop from processing other coroutines, no matter how long it will take to finish. It will stop only the function that is awaiting results from such a call, but the whole event loop will still keep spinning.

And a useful fact is that you don't even need to create your executor instance. If you pass None as an executor argument, the ThreadPoolExecutor class will be used with the default number of threads (for Python 3.9, it is the number of processors multiplied by 5).

So, let's assume that we did not want to rewrite the problematic part of our API-facing code that was the cause of our headache. We can easily defer the blocking call to a separate thread with the loop.run_in_executor() call, while still leaving the fetch_rates() function as an awaitable coroutine, as follows:

async def fetch_rates(base):
    loop = asyncio.get_event_loop()
    response = await loop.run_in_executor(
        None, requests.get,
        f"https://api.vatcomply.com/rates?base={base}"
    )
    response.raise_for_status()
    rates = response.json()["rates"]
    # note: same currency exchanges to itself 1:1
    rates[base] = 1.
    return base, rates

Such a solution is not as good as having a fully asynchronous library to do the job, but half a loaf is better than none.

Asynchronous programming is a great tool for building performant concurrent applications that have to communicate a lot with other services over the network. You can do that easily without all the memory safety problems that usually come with multithreading and (to some extent) multiprocessing. A lack of involuntary context switching also reduces the number of necessary locking primitives because it is easy to predict when coroutines return control to the event loop.

Unfortunately, that comes at the cost of having to use dedicated asynchronous libraries. Synchronous and threaded applications usually have better coverage of client and communication libraries for interacting with popular services. Executors and futures allow you to fill that gap but are less optimal than native asynchronous solutions.

Summary

It was a long journey, but we successfully struggled through most of the common approaches to concurrent programming that are available for Python programmers.

After explaining what concurrency really is, we jumped into action and dissected one of the typical concurrent problems with the help of multithreading. After identifying the basic deficiencies of our code and fixing them, we turned to multiprocessing to see how it would work in our case. We found that multiple processes with the multiprocessing module are a lot easier to use than plain threads coming with the threading module. But just after that, we realized that we can use the same API for threads too, thanks to the multiprocessing.dummy module. So, the decision between multiprocessing and multithreading is now only a matter of which solution better suits the problem and not which solution has a better interface.

And speaking about problem fit, we finally tried asynchronous programming, which should be the best solution for I/O-bound applications, only to realize that we cannot completely forget about threads and processes. So, we made a circle! Back to the place where we started.

And this leads us to the final conclusion of this chapter. There is no silver bullet. There are some approaches that you may prefer. There are some approaches that may fit better for a given set of problems, but you need to know them all in order to be successful. In real-life scenarios, you may find yourself using the whole arsenal of concurrency tools and styles in a single application, and this is not uncommon.

In the next chapter, we will take a look at a topic somewhat related to concurrency: event-driven programming. In that chapter, we will be concentrating on various communication patterns that form the backbone of distributed asynchronous and highly concurrent systems.

    Reset